package com.example.tolerant;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 这里主要演示：容错处理的代码步骤
 */
public class StateBackEnd2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //只有开启checkpoint 才会有重启策略
        env.enableCheckpointing(8000L);

        //设置重启策略为2次，间隔2秒--理解上是最多支持2次异常的重启。第二次后就挂了程序退出。疑问，另外几种重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2));

        //设置备份策略为本地文件系统
        String checkpointDataUri = "file:///E:\\Gitee\\UserBehaviorAnalysis\\FaultTolerant\\statebackend\\";
        //https://blog.csdn.net/Jiny_li/article/details/86419995
        env.setStateBackend(new FsStateBackend(checkpointDataUri));

        //设置canceljob 或者异常退出JOB以后不要清除checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("111.230.113.15", 9090);
        SingleOutputStreamOperator<Tuple2<String, Integer>> outputStreamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                if ("jerry".equals(value)) {
                    throw new RuntimeException("输入异常触发词 jerry");
                }
                return new Tuple2<String, Integer>(value, 1);
            }
        }).keyBy(0).sum(1);
        outputStreamOperator.print("单词统计");
        env.execute("容错恢复测试");
    }
}
